6e75016c45c602c874086dea26324ca413f0c141,external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java,SingleTopicKafkaSpoutTest,shouldEmitAllMessages,#,132

Before Change


    @Test
    public void shouldEmitAllMessages() throws Exception {
        int messageCount = 10;
        SpoutContext context = initializeSpout(messageCount);


        IntStream.range(0, messageCount).forEach(value -> {
            context.spout.nextTuple();
            ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
            verify(context.collector).emit(
                    eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                    eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
                            Integer.toString(value),
                            Integer.toString(value))),
            messageId.capture());
            context.spout.ack(messageId.getValue());
            reset(context.collector);
        });

        context.spout.acked.values().forEach(item -> {
            assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
        });
    }

    @Test

After Change


    public void shouldEmitAllMessages() throws Exception {
        try (SimulatedTime simulatedTime = new SimulatedTime()) {
            int messageCount = 10;
            initializeSpout(messageCount);

            //Emit all messages and check that they are emitted. Ack the messages too
            IntStream.range(0, messageCount).forEach(value -> {